Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Queue notification to optimize polling #257

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

neob91-close
Copy link
Contributor

@neob91-close neob91-close commented Dec 15, 2022

This PR is a PoC for an optimised way to poll queues. Tests were deliberately omitted at this stage.

The way it works is that the key t:queued_cache_token:<queue root> (I welcome better naming here) is updated with a random token whenever a task is added to that queue.

The workers will monitor the tokens related to the queues they're interested in and do the expensive queue poll only when any of them change (or if an hour elapsed since the last poll, as a safeguard).

@neob91-close neob91-close force-pushed the queue-notification-to-optimize-polling branch from 4642deb to f9e6f13 Compare December 15, 2022 12:50
@neob91-close neob91-close changed the title Queue notification to optimize polling [PoC] Queue notification to optimize polling Dec 16, 2022
Copy link
Contributor

@nsaje nsaje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting idea, looks promising! W.r.t. naming, queue_token is indeed a bit too general to know what it's for without reading the code. Not really sure about this but what about queue_etag? It functions the same way as HTTP's ETag so counting on association here.

client.set(
self._key("queue_token", queue.split(".", 1)[0]),
secrets.token_hex(),
ex=3600 * 24 * 30, # To avoid leftover keys as queues change
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make it configurable in self.config?


if self._should_poll_for_queues():
self._refresh_queue_set()
self._next_forced_queue_poll = time.monotonic() + 3600
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make the 3600 configurable in TaskTiger.config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be less than or equal to the token key expiry, so I think we should use the configuration option for that here. See #257 (comment) for more details as to why.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be less than or equal to the token key expiry,

Why?

self._key("queue_token", queue) for queue in self.only_queues
)

return ":".join(map(str, self.connection.mget(keys)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't need this to be serializable, we could also just return it as a tuple and compare the tuples. The initial _queue_set_token could just be an empty tuple (or None).


def _get_queue_set_token(self) -> str:
keys = sorted(
self._key("queue_token", queue) for queue in self.only_queues
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • What if we don't use only_queues? Would we then only poll every hour?
  • The way it is currently implemented, this wouldn't work if we process a sub-queue (e.g. "foo.bar"), right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't use only_queues, we'd poll the same way we do now (every POLL_TASK_QUEUES_INTERVAL)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we don't use only_queues? Would we then only poll every hour?
The way it is currently implemented, this wouldn't work if we process a sub-queue (e.g. "foo.bar"), right?

Correct, but all we need to do is take a root of each queue from self.only_queues to get it work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't use only_queues, we'd poll the same way we do now (every POLL_TASK_QUEUES_INTERVAL)

Ah, because _should_poll_for_queues returns True if not self.only_queues.

client.publish(self._key("activity"), queue)

client.set(
self._key("queue_token", queue.split(".", 1)[0]),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this optimization essentially only works for the top level queue name. Would there be an issue if we set the queue token for each level? For example, if the queue is "foo.bar.baz" we set the queue token for "foo.bar.baz", "foo.bar", "foo", and possibly "" (root/top level token for a client that processes all queues). That way, a client with only_queues would use the specific queues as queue token keys in _get_queue_set_token, and a client without only_queues would look at the root token only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this initially.

The first draft of this feature used the empty root "" to detect changes on all queues, but I removed it because I didn't believe it would be that useful (we don't care about polling so much when there are really few subqueues present and it probably wouldn't help much if there are a lot)

I see no harm in bringing it back though as it could prove beneficial under certain rare conditions.

Regarding allowing subqueues (e.g. foo.bar), I wanted to do just that, but then realised that it could potentially have a large memory footprint under certain conditions. We could lower the memory footprint by using a relatively low value for key expiry.

However, the forced poll interval must be less than or equal to the expiry time for the token keys, and I would like the forced poll interval to be 30-60min (its main goal is to mitigate a scenario where the token key is empty for two subsequent checks performed by the worker, yet without the worker's knowledge it was set to some value and then expired between these two checks - e.g. because the worker was busy working on something before the second check). That means we'd basically be keeping a key for every subqueue from the last hour.

I'd like to play it safe and only use root for now. Let's consider subqueue tokens in the next iteration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Obviously if we don't use subqueues for this optimisation, the logic needs to be adjusted to take the root of each queue from self.only_queues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could potentially have a large memory footprint under certain conditions

What specific footprint are you concerned about? Redis?

I'm okay doing this later and just sticking to the root queue name, it's just that I originally designed the queueing to be independent from how many subqueues you're using. E.g. switching queues from billing.<org_id> to billing.usage.<org_id> and billing.invoices.<org_id> shouldn't have a performance impact. But I'll leave it up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What specific footprint are you concerned about? Redis?

Yes. We'd need to store a token for every single subqueue of which there could be many (I don't think it's unreasonable to expect up to as many subqueues as there could be tasks). And these keys would stick around for a while. I'm worried we might be trading CPU issues this is supposed to fix for memory issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current design it needs to be cached for at least as long as the interval for the forced poll implemented in this PR.

That's because we want to avoid a scenario where:

  • the last token value retrieved by the worker is empty
  • a worker has been busy processing multiple tasks for so long that the key is updated, and then expires again
  • by the time the worker is done, it sees that the key is empty again, so it does nothing

And I'd like the forced poll interval to be as long as possible to avoid unnecessary polls.
It's only supposed to mitigate the issue with key expiration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a worker has been busy processing multiple tasks for so long that the key is updated, and then expires again

Could the workers update the token after every task?

Copy link
Contributor Author

@neob91-close neob91-close Dec 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They could, but it got me thinking and I realised that if we forced a token update every key expiry interval (instead of forcing the poll) reset the force poll timer on every token update, we'd solve the problem and we could lower the token key expiry without the polling frequency increasing linearly with it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll push the adjustments today.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vtclose vtclose removed the request for review from tsx December 19, 2022 16:20
@neob91-close neob91-close force-pushed the queue-notification-to-optimize-polling branch from f450642 to 507d72f Compare December 20, 2022 10:29
@neob91-close neob91-close force-pushed the queue-notification-to-optimize-polling branch from 95f2fc3 to c0751b3 Compare December 20, 2022 10:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants